In [1]:
%matplotlib inline
from __future__ import print_function
import numpy as np
from six.moves import range
import keras.backend as K
from keras.models import Model, Sequential
from keras.engine.training import slice_X
from keras.layers import Lambda, Flatten, Permute, Reshape, Input
from keras.layers import merge, Merge, recurrent
from keras.layers import Activation, TimeDistributed, Dense, RepeatVector
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import pylab as pl
import matplotlib.cm as cm
from scipy.special import expit
In [2]:
class CharacterTable(object):
'''
Given a set of characters:
+ Encode them to a one hot integer representation
+ Decode the one hot integer representation to their character output
+ Decode a vector of probabilities to their character output
'''
def __init__(self, chars, maxlen):
self.chars = sorted(set(chars))
self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
self.maxlen = maxlen
def encode(self, C, maxlen=None):
maxlen = maxlen if maxlen else self.maxlen
X = np.zeros((maxlen, len(self.chars)))
for i, c in enumerate(C):
X[i, self.char_indices[c]] = 1
return X
def decode(self, X, calc_argmax=True):
if calc_argmax:
X = X.argmax(axis=-1)
return ''.join(self.indices_char[x] for x in X)
In [3]:
class colors:
ok = '\033[92m'
fail = '\033[91m'
close = '\033[0m'
In [30]:
# Parameters for the model and dataset
TRAINING_SIZE = 100000
DIGITS = 5
OPS = 2
INVERT = True
# Try replacing GRU, or SimpleRNN
RNN = recurrent.LSTM
HIDDEN_SIZE = 16
BATCH_SIZE = 128
LAYERS = 1
MAXLEN = OPS * DIGITS + OPS - 1
In [31]:
chars = '0123456789+ '
ctable = CharacterTable(chars, MAXLEN)
In [32]:
def generate_data(training_size, num_digits, num_ops):
questions = []
expected = []
seen = set()
print('Generating data... ')
while len(questions) < training_size:
# f = lambda: int(''.join(np.random.choice(list('0123456789')) for i in range(np.random.randint(1, num_digits + 1))))
f = lambda: int(''.join(np.random.choice(list('0123456789')) for i in range(num_digits)))
ops = []
for i in range(num_ops):
ops.append(f())
# Skip any addition questions we've already seen
# Also skip any such that X+Y == Y+X (hence the sorting)
# ops.sort()
key = tuple(ops)
if key in seen:
continue
seen.add(key)
# Pad the data with spaces such that it is always MAXLEN
ops_str = []
format_str = '{:>' + str(num_digits) + '}'
for op in ops:
op_str = format_str.format(str(op))
ops_str.append(op_str)
q = '+'.join([str(op) for op in ops_str])
query = q + ' ' * (MAXLEN - len(q))
ans = str(sum(ops))
# Answers can be of maximum size DIGITS + 1
if INVERT:
query = query[::-1]
ans = ans[::-1]
ans += ' ' * (num_digits + 1 - len(ans))
questions.append(query)
expected.append(ans)
# print(len(questions))
print('Total addition questions:', len(questions))
return questions, expected
In [33]:
def create_train_valid(questions, expected, num_digits, num_ops, percentage):
print('Vectorization...')
X = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool)
y = np.zeros((len(questions), num_digits + 1, len(chars)), dtype=np.bool)
for i, sentence in enumerate(questions):
X[i] = ctable.encode(sentence, maxlen=MAXLEN)
for i, sentence in enumerate(expected):
y[i] = ctable.encode(sentence, maxlen=num_digits + 1)
# Shuffle (X, y) in unison as the later parts of X will almost all be larger digits
indices = np.arange(len(y))
np.random.shuffle(indices)
X = X[indices]
y = y[indices]
# Explicitly set apart 10% for validation data that we never train over
split_at = len(X) - len(X)*percentage
(X_train, X_val) = (slice_X(X, 0, split_at), slice_X(X, split_at))
(y_train, y_val) = (y[:split_at], y[split_at:])
print(X_train.shape)
print(y_train.shape)
return X_train, y_train, X_val, y_val
In [34]:
questions, expected = generate_data(TRAINING_SIZE, DIGITS, OPS)
X_train, y_train, X_val, y_val = create_train_valid(questions, expected, DIGITS, OPS, 0.5)
In [45]:
questions[0][::-1]
Out[45]:
In [36]:
expected[0]
Out[36]:
In [41]:
def standard_seq2seq_model(hidden_size, num_layers, num_digits, num_ops):
# Most simple seq2seq model using encoder-decoder framework
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE
# note: in a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
encoder = RNN(hidden_size, input_shape=(MAXLEN, len(chars)))
model.add(encoder)
# For the decoder's input, we repeat the encoded input for each time step
model.add(RepeatVector(num_digits + 1))
# The decoder RNN could be multiple layers stacked or a single layer
for _ in range(num_layers):
decoder = RNN(hidden_size, return_sequences=True)
model.add(decoder)
# For each of step of the output sequence, decide which character should be chosen
mapper = TimeDistributed(Dense(len(chars)))
model.add(mapper)
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
inputs = [K.learning_phase()] + model.inputs
encoder_f = K.function(inputs, [encoder.output])
decoder_f = K.function(inputs, [decoder.output])
mapper_f = K.function(inputs, [mapper.output])
return model, encoder_f, decoder_f, mapper_f, encoder, decoder, mapper
In [43]:
import theano.tensor as T
def get_last_Y(X):
return X[:, -1, :]
def get_Y(X, xmaxlen):
return X[:, :xmaxlen, :] # get first xmaxlen elem from time dim
def get_R(X):
Y, alpha = X[0], X[1]
ans = K.T.batched_dot(Y, alpha)
return ans
def get_R_shape(input_shape):
shape = list(input_shape)
outshape = (shape[0][0],shape[0][1])
return tuple(outshape)
def stack_decoder_input(X):
ans = K.concatenate(X, axis=2)
return ans
def stack_decoder_input_shape(input_shape):
shape = list(input_shape)
outshape = (shape[0][0], len(shape), shape[0][2])
return tuple(outshape)
def attentional_seq2seq_model(hidden_size, num_layers, num_digits, num_ops, chars):
main_input = Input(shape=(MAXLEN,len(chars)), name='main_input')
encoder = RNN(hidden_size,
input_shape=(MAXLEN, len(chars)),
return_sequences=True)(main_input)
Y = Lambda(get_Y, arguments={"xmaxlen": MAXLEN}, name="Y", output_shape=(MAXLEN, hidden_size))(encoder)
Y_trans = Permute((2, 1), name="y_trans")(Y) # of shape (None,300,20)
# Input_trans = Permute((2, 1), name="input_trans")(main_input)
r_array = []
for idx in range(num_digits+1):
WY = TimeDistributed(Dense(len(chars)), name="WY_"+str(idx))(Y)
M = Activation('tanh', name="M_"+str(idx))(WY)
alpha_ = TimeDistributed(Dense(1, activation='linear'), name="alpha_"+str(idx))(M)
flat_alpha = Flatten(name="flat_alpha_"+str(idx))(alpha_)
alpha = Dense(MAXLEN, activation='softmax', name="alpha"+str(idx))(flat_alpha)
r_ = merge([Y_trans, alpha], output_shape=get_R_shape, name="r_"+str(idx), mode=get_R)
r = Reshape((1,hidden_size))(r_)
r_array.append(r)
decoder_input = merge(r_array, mode=stack_decoder_input, output_shape=stack_decoder_input_shape)
decoded_result = RNN(hidden_size, input_shape=(num_digits+1, hidden_size), return_sequences=True)(decoder_input)
mapping = TimeDistributed(Dense(len(chars)))(decoded_result)
out = Activation('softmax')(mapping)
model = Model(input=[main_input], output=out)
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
inputs = [K.learning_phase()] + model.inputs
a1 = model.get_layer('alpha1')
a2 = model.get_layer('alpha2')
a3 = model.get_layer('alpha3')
alpha1_f = K.function(inputs, [a1.output])
alpha2_f = K.function(inputs, [a2.output])
alpha3_f = K.function(inputs, [a3.output])
return model, alpha1_f, alpha2_f, alpha3_f, a1, a2, a3
In [38]:
def learning(model, X_train, y_train, iterations, X_val, y_val):
y_true = []
for idx in range(y_val.shape[0]):
y_true.append(ctable.decode(y_val[idx]))
training_obj = model.fit(X_train, y_train, batch_size=BATCH_SIZE, nb_epoch=iterations,
validation_data=(X_val, y_val))
In [ ]:
std_model, encoder_f, decoder_f, mapper_f, encoder, decoder, mapper = standard_seq2seq_model(HIDDEN_SIZE, LAYERS, DIGITS, OPS)
# val_acc_2_2 = learning(model, X_train, y_train, 100, X_val, y_val)
learning(std_model, X_train, y_train, 200, X_val, y_val)
In [44]:
att_model, alpha1_f, alpha2_f, alpha3_f, aplha1, alpha2, alpha3 = attentional_seq2seq_model(HIDDEN_SIZE, LAYERS, DIGITS, OPS, chars)
# val_acc_2_2 = learning(model, X_train, y_train, 100, X_val, y_val)
learning(att_model, X_train, y_train, 100, X_val, y_val)
In [28]:
X_str = '13+21'
X_str = X_str[::-1]
print(X_str)
X = ctable.encode(X_str, maxlen=MAXLEN).reshape([1,5,12])
preds = std_model.predict(X, verbose=0)
y_hat = preds[0].argmax(axis=-1)
y_str = ''.join(ctable.indices_char[x] for x in y_hat)# ctable.indices_char[x]
print(y_str)
preds2 = att_model.predict(X, verbose=0)
y_hat2 = preds2[0].argmax(axis=-1)
y_str2 = ''.join(ctable.indices_char[x] for x in y_hat2)# ctable.indices_char[x]
print(y_str2)
In [ ]: